{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "colab_type": "text",
        "id": "view-in-github"
      },
      "source": [
        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "source": [
        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
        "\n",
        "# Licensed to the Apache Software Foundation (ASF) under one\n",
        "# or more contributor license agreements. See the NOTICE file\n",
        "# distributed with this work for additional information\n",
        "# regarding copyright ownership. The ASF licenses this file\n",
        "# to you under the Apache License, Version 2.0 (the\n",
        "# \"License\"); you may not use this file except in compliance\n",
        "# with the License. You may obtain a copy of the License at\n",
        "#\n",
        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing,\n",
        "# software distributed under the License is distributed on an\n",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
        "# KIND, either express or implied. See the License for the\n",
        "# specific language governing permissions and limitations\n",
        "# under the License."
      ],
      "outputs": [],
      "metadata": {
        "cellView": "form"
      }
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "V913rQcmLS72"
      },
      "source": [
        "# Welcome to Apache Beam!\n",
        "\n",
        "This notebook will be your introductory guide to Beam's main concepts and its uses. This tutorial **does not** assume any prior Apache Beam knowledge.\n",
        "\n",
        "We'll cover what Beam is, what it does, and a few basic transforms!\n",
        "\n",
        "We aim to give you familiarity with:\n",
        "- Creating a `Pipeline`\n",
        "- Creating a `PCollection`\n",
        "- Performing basic `PTransforms`\n",
        "  - Map\n",
        "  - Filter\n",
        "  - FlatMap\n",
        "  - Combine\n",
        "- Applications\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Lduh-9oXt3P_"
      },
      "source": [
        "## How To Approach This Tutorial\n",
        "\n",
        "This tutorial was designed for someone who likes to **learn by doing**. As such, there will be opportunities for you to practice writing your own code in these notebooks with the answer hidden in a cell below.\n",
        "\n",
        "Codes that require editing will be with an `...` and each cell title will say `Edit This Code`. However, you are free to play around with the other cells if you would like to add something beyond our tutorial.\n",
        "\n",
        "It may be tempting to just copy and paste solutions, but even if you do look at the Answer cells, try typing out the solutions manually. The muscle memory will be very helpful.\n",
        "\n",
        "> Tip: For those who would like to learn concepts more from the ground up, check out the [Tour of Beam](https://tour.beam.apache.org/)!"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "42B-64Lvef3K"
      },
      "source": [
        "## Prerequisites\n",
        "\n",
        "We'll assume you have familiarity with Python or Pandas, but you should be able to follow along even if you’re coming from a different programming language. We'll also assume you understand programming concepts like functions, objects, arrays, and dictionaries.\n",
        "\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "SeDD0nardXyL"
      },
      "source": [
        "## Running CoLab\n",
        "\n",
        "To navigate through different sections, use the table of contents. From View drop-down list, select Table of contents.\n",
        "\n",
        "To run a code cell, you can click the Run cell button at the top left of the cell, or by select it and press `Shift+Enter`. Try modifying a code cell and re-running it to see what happens."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "UdMPnMDDkGc8"
      },
      "source": [
        "To begin, we have to set up our environment. Let's install and import Apache Beam by running the cell below."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "5cPHukaOgDDM"
      },
      "outputs": [],
      "source": [
        "# Remember: You can press shift+enter to run this cell\n",
        "!pip install --quiet apache-beam\n",
        "import apache_beam as beam"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "30l8_MD-undP"
      },
      "outputs": [],
      "source": [
        "# Set the logging level to reduce verbose information\n",
        "import logging\n",
        "\n",
        "logging.root.setLevel(logging.ERROR)\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "gyoo1gLKtZmU"
      },
      "source": [
        "\n",
        "\n",
        "---\n",
        "\n",
        "\n",
        "\n",
        "---\n",
        "\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "N29KJTfdMCtB"
      },
      "source": [
        "# What is Apache Beam?"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "B2evhzEuMu8a"
      },
      "source": [
        "Apache Beam is a library for data processing. It is often used for [Extract-Transform-Load (ETL)](https://en.wikipedia.org/wiki/Extract,_transform,_load) jobs, where we:\n",
        "1. *Extract* from a data source\n",
        "2. *Transform* that data\n",
        "3. *Load* that data into a data sink (like a database)\n",
        "\n",
        "Apache Beam makes these jobs easy with the ability to process everything at the same time and its unified model and open-source SDKs. There are many more parts of Beam, but throughout these tutorials, we will break down each part to show you how they will all fit together.\n",
        "\n",
        "For this tutorial, you will use these Beam SDKs to build your own `Pipeline` to process your data.\n",
        "\n",
        "Below, we will run through creating the heart of the `Pipeline`. There are three main abstractions in Beam:\n",
        "1. `Pipeline`\n",
        "2. `PCollection`\n",
        "3. `PTransform`"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "WecDJbfqpWb1"
      },
      "source": [
        "\n",
        "\n",
        "---\n",
        "\n",
        "\n",
        "---\n",
        "\n",
        "\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "FditMJAIAp9Q"
      },
      "source": [
        "# 1. Design Your Pipeline"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "_xYj6e3Hvt1W"
      },
      "source": [
        "## 1.1 What is a Pipeline"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {
        "id": "bVJCh9FLBtU9"
      },
      "source": [
        "A `Pipeline` describes the whole cycle of your data processing task, starting from the data sources to the processing transforms you will apply to them until your desired output.\n",
        "\n",
        "`Pipeline` is responsible for reading, processing, and saving the data. \n",
        "Each `PTransform` is done on or outputs a `PCollection`, and this process is done in your `Pipeline`.\n",
        "More glossary details can be found at [here](https://beam.apache.org/documentation/glossary/).\n",
        "\n",
        "A diagram of this process is shown below:\n",
        "\n",
        "<img src='https://beam.apache.org/images/design-your-pipeline-linear.svg'>"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {
        "id": "cyn8_mB6zN4m"
      },
      "source": [
        "In code, this process will look like this:\n",
        "\n",
        "\n",
        "```\n",
        "# Each `step` represents a specific transform. After `step3`, it will save the data reference to `outputs`.\n",
        "outputs = pipeline | step1 | step2 | step3\n",
        "```\n",
        "\n",
        ">The pipe operator `|`  applies the `PTransform` on the right side of the pipe to the input `PCollection`.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "gUyk2UWypI7g"
      },
      "source": [
        "Pipelines can quickly grow long, so it's sometimes easier to read if we surround them with parentheses and break them into multiple lines.\n",
        "\n",
        "```\n",
        "# This is equivalent to the example above.\n",
        "outputs = (\n",
        "  pipeline\n",
        "  | step1\n",
        "  | step2\n",
        "  | step3\n",
        ")\n",
        "```\n",
        "\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "7hmlbjPlvZVY"
      },
      "source": [
        "Sometimes, the transform names aren't very descriptive. Beam allows each transform, or step, to have a unique label, or description. This makes it a lot easier to debug, and it's in general a good practice to start.\n",
        "\n",
        "> You can use the right shift operator `>>` to add a label to your transforms, like `'My description' >> MyTransform`.\n",
        "\n",
        "```\n",
        "# Try to give short but descriptive labels.\n",
        "# These serve both as comments and help debug later on.\n",
        "outputs = (\n",
        "  pipeline\n",
        "  | 'First step' >> step1\n",
        "  | 'Second step' >> step2\n",
        "  | 'Third step' >> step3\n",
        ")\n",
        "```"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {
        "id": "GXQ__Kwxvr3J"
      },
      "source": [
        "## 1.2 Loading Our Data\n",
        "\n",
        "Now, you can try to write your own pipeline!\n",
        "\n",
        "First, let's load the example data we will be using throughout this tutorial into our file directory. This [dataset](https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection) consists of a **collection of SMS messages in English tagged as either \"spam\" or \"ham\" (a legitimate SMS\n",
        ")**.\n",
        "\n",
        "For this tutorial, we will create a pipeline to **explore the dataset using Beam to count words in SMS messages that contain spam or ham**."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "BkZ0wxsBPuvO"
      },
      "outputs": [],
      "source": [
        "# Creates a data directory with our dataset SMSSpamCollection\n",
        "!mkdir -p data\n",
        "!gsutil cp gs://apachebeamdt/SMSSpamCollection data/"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "TiHFDIMnLTbm"
      },
      "source": [
        "**What does the data look like?**\n",
        "\n",
        "This dataset is a `txt` file with 5,574 rows and 4 columns recording the following attributes:\n",
        "1. `Column 1`: The label (either `ham` or `spam`)\n",
        "2. `Column 2`: The SMS as raw text (type `string`)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "! head data/SMSSpamCollection"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "r-0WM38KNaTI"
      },
      "source": [
        "## 1.3 Writing Our Own Pipeline\n",
        "\n",
        "Now that we understand our dataset, let's go into creating our pipeline.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "1Ajz7-WQCVfj"
      },
      "source": [
        "To initialize a `Pipeline`, you first assign your pipeline `beam.Pipeline()` to a name. Assign your pipeline to the name, `pipeline`, in the code cell below."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "cXm0n80E9ZwT"
      },
      "outputs": [],
      "source": [
        "#@title Edit This Code Cell\n",
        "..."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "q0gzEcos9hjp"
      },
      "outputs": [],
      "source": [
        "#@title Answer\n",
        "pipeline = beam.Pipeline()"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {
        "id": "-K6tpdVK9KzC"
      },
      "source": [
        "This pipeline will be where we create our transformed `PCollection`. In Beam, your data lives in a `PCollection`, which stands for `Parallel Collection`.\n",
        "\n",
        "A **PCollection** is like a list of elements, but without any order guarantees. This allows Beam to easily parallelize and distribute the `PCollection`'s elements.\n",
        "\n",
        "Now, let's use one of Beam's `Read` transforms to turn our text file (our dataset) into a `PCollection`. "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MS0XWqTiAIlE"
      },
      "source": [
        "## 1.4 Reading from Text File\n",
        "\n",
        "We can use the\n",
        "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
        "transform to read text files into `str` elements.\n",
        "\n",
        "It takes a\n",
        "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
        "as an input, and reads all the files that match that pattern. For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension. It then **returns one element for each line** in the file.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "PXUSzLhhBwdr"
      },
      "source": [
        "Because we only want this pipeline to read the `SMSSpamCollection` file that's in the `data/` directory, we will specify the input pattern to be `'data/SMSSpamCollection'`.\n",
        "\n",
        "We will then use that input pattern with our transform `beam.io.ReadFromText()` and apply it onto our pipeline. The `beam.io.ReadFromText()` transform can take in an input pattern as an input."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "HYs5697QEArB"
      },
      "outputs": [],
      "source": [
        "#@title Hint\n",
        "# If you get stuck on the syntax, use the Table of Contents to navigate to 1.1\n",
        "# What is a Pipeline and reread that section."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "p5z6C65tEmtM"
      },
      "outputs": [],
      "source": [
        "#@title Edit This Code\n",
        "\n",
        "inputs_pattern = 'data/SMSSpamCollection'\n",
        "\n",
        "pipeline = beam.Pipeline()\n",
        "\n",
        "outputs = (\n",
        "  pipeline\n",
        "  | beam.io.ReadFromText(inputs_pattern)\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "zC9blgIXBv5A"
      },
      "outputs": [],
      "source": [
        "#@title Answer\n",
        "inputs_pattern = 'data/SMSSpamCollection'\n",
        "\n",
        "pipeline = beam.Pipeline()\n",
        "\n",
        "# Remember: | is the apply function in Beam in Python\n",
        "outputs = (\n",
        "    pipeline\n",
        "      # Remember to add short descriptions to your transforms for good practice and easier understanding\n",
        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ISm7PF9wE7Ts"
      },
      "source": [
        "## 1.5 Writing to Text File\n",
        "\n",
        "Now, how do we know if we did it correctly? Let's take a look at the text file you just read.\n",
        "\n",
        "You may have noticed that we can't simply `print` the output `PCollection` to see the elements. In Beam, you can __NOT__ access the elements from a `PCollection` directly like a Python list.\n",
        "\n",
        "This is because, depending on the runner,\n",
        "the `PCollection` elements might live in multiple worker machines.\n",
        "\n",
        "However, we can see our output `PCollection` by using a [`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText) transform to turn our `str` elements into a `txt` file (or another file type of your choosing) and then running a command to show the head of our output file."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "iz73k9d1IUGw"
      },
      "source": [
        "> `beam.io.WriteToText` takes a _file path prefix_ as an input, and it writes the all `str` elements into one or more files with filenames starting with that prefix.\n",
        "> You can optionally pass a `file_name_suffix` as well, usually used for the file extension.\n",
        "> Each element goes into its own line in the output files."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "D80HrbeKFxCv"
      },
      "source": [
        "Now, you can try it. Save the results to a file path prefix `'output'` and make the file_name_suffix `'.txt'`"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "v6RvVVINKOt9"
      },
      "outputs": [],
      "source": [
        "#@title Edit This Code\n",
        "inputs_pattern = 'data/SMSSpamCollection'\n",
        "\n",
        "pipeline = beam.Pipeline()\n",
        "\n",
        "# Remember: | is the apply function in Beam in Python\n",
        "outputs = (\n",
        "    pipeline\n",
        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
        "      | 'Write results' >> beam.io.WriteToText(..., file_name_suffix = ...)\n",
        "      # To see the results from the previous transform\n",
        "      | 'Print the text file name' >> beam.Map(print) # or beam.LogElements()\n",
        ")\n",
        "\n",
        "# To run the pipeline\n",
        "pipeline.run()\n",
        "\n",
        "# The command used to view your output txt file.\n",
        "# If you choose to save the file path prefix to a different location or change the file type,\n",
        "# you have to update this command as well.\n",
        "! head output*.txt"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "nRYxclPaJArp"
      },
      "outputs": [],
      "source": [
        "#@title Answer\n",
        "inputs_pattern = 'data/SMSSpamCollection'\n",
        "\n",
        "pipeline = beam.Pipeline()\n",
        "\n",
        "outputs = (\n",
        "    pipeline\n",
        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
        "      # ADDED\n",
        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput1\", file_name_suffix = \".txt\")\n",
        "      | 'Print the text file name' >> beam.Map(print)\n",
        ")\n",
        "\n",
        "pipeline.run()\n",
        "\n",
        "# The file this data is saved to is called \"ansoutput1\" as seen in the WriteToText transform.\n",
        "# The command below and the transform input should match.\n",
        "! head ansoutput1*.txt"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {
        "id": "iie23Id-O33B"
      },
      "source": [
        "# 2. PTransforms\n",
        "\n",
        "Now that we have read in our code, we can now process our data to explore the text messages that could be classified as spam or non-spam (ham).\n",
        "\n",
        "In order to achieve this, we need to use `PTransforms`.\n",
        "\n",
        "A **`PTransform`** is any data processing operation that performs a processing function on one or more `PCollection`, outputting zero or more `PCollection`.\n",
        "\n",
        "Some PTransforms accept user-defined functions that apply custom logic, which you will learn in the *Advanced Transforms* notebook. The “P” stands for “parallel.”"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "vPhZG2GgPCW7"
      },
      "source": [
        "## 2.1 Map\n",
        "\n",
        "One feature to use for the classifier to distinguish spam SMS from ham SMS is to compare the distribution of common words between the two categories. To find the common words for the two categories, we want to perform a frequency count of each word in the data set."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "cR1YWyvQj1tm"
      },
      "source": [
        "First, because the data set is read line by line, let's clean up the `PCollection` so that the label and the SMS is separated.\n",
        "\n",
        "To do so, we will use the transform `Map`, which takes a **function** and **maps it** to **each element** of the collection and transforms a single input `a` to a single output `b`.\n",
        "\n",
        "In this case, we will use `beam.Map` which takes in a lambda function and uses regex to split the line into a two item list: [label, SMS]. The lambda function we will use is `lambda line: line.split(\"\\t\")`, which splits each element of the `PCollection` by tab and putting them into a list.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Xey-I5BFvCiH"
      },
      "source": [
        "Add a line of code between the `ReadFromText` and `WriteToText` transform that applies a `beam.Map` transform that takes in the function described above. Remember to add a short description for your transform!"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "hdrOsitSuj7z"
      },
      "outputs": [],
      "source": [
        "#@title Edit This Code\n",
        "import re\n",
        "\n",
        "inputs_pattern = 'data/SMSSpamCollection'\n",
        "\n",
        "pipeline = beam.Pipeline()\n",
        "\n",
        "outputs = (\n",
        "    pipeline\n",
        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
        "      ...\n",
        "      | 'Write results' >> beam.io.WriteToText(\"output2\", file_name_suffix = \".txt\")\n",
        "      | 'Print the text file name' >> beam.Map(print)\n",
        ")\n",
        "\n",
        "pipeline.run()\n",
        "\n",
        "! head output2*.txt"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "2k4e3j24wTzM"
      },
      "outputs": [],
      "source": [
        "#@title Answer\n",
        "import re\n",
        "\n",
        "inputs_pattern = 'data/SMSSpamCollection'\n",
        "\n",
        "pipeline = beam.Pipeline()\n",
        "\n",
        "outputs = (\n",
        "    pipeline\n",
        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
        "      # ADDED\n",
        "      | 'Separate to list' >> beam.Map(lambda line: line.split(\"\\t\"))\n",
        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput2\", file_name_suffix = \".txt\")\n",
        "      | 'Print the text file name' >> beam.Map(print)\n",
        ")\n",
        "\n",
        "pipeline.run()\n",
        "\n",
        "! head ansoutput2*.txt"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ju8TXxBYwvzX"
      },
      "source": [
        "## 2.2 Filter\n",
        "\n",
        "Now that we have a list separating the label and the SMS, let's first focus on only counting words with the **spam** label. In order to process certain elements while igorning others, we want to filter out specific elements in a collection using the transform `Filter`.\n",
        "\n",
        "`beam.Filter` takes in a function that checks a single element a, and returns True to keep the element, or False to discard it.\n",
        "\n",
        "In this case, we want `Filter` to return true if the list contains the label **spam**.\n",
        "\n",
        "We will use a lambda function again for this example, but this time, you will write the lambda function yourself. Add a line of code after your `beam.Map` transform to only return a `PCollection` that only contains lists with the label **spam**."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "a72v9SQ0zb5u"
      },
      "outputs": [],
      "source": [
        "#@title Edit This Code\n",
        "import re\n",
        "\n",
        "inputs_pattern = 'data/SMSSpamCollection'\n",
        "\n",
        "pipeline = beam.Pipeline()\n",
        "\n",
        "outputs = (\n",
        "    pipeline\n",
        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
        "      | 'Separate to list' >> beam.Map(lambda line: line.split(\"\\t\"))\n",
        "      ...\n",
        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput3\", file_name_suffix = \".txt\")\n",
        "      | 'Print the text file name' >> beam.Map(print)\n",
        ")\n",
        "\n",
        "pipeline.run()\n",
        "\n",
        "! head ansoutput3*.txt"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "BmvUeOztzCv0"
      },
      "outputs": [],
      "source": [
        "#@title Answer\n",
        "import re\n",
        "\n",
        "inputs_pattern = 'data/SMSSpamCollection'\n",
        "\n",
        "pipeline = beam.Pipeline()\n",
        "\n",
        "outputs = (\n",
        "    pipeline\n",
        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
        "      | 'Separate to list' >> beam.Map(lambda line: line.split(\"\\t\"))\n",
        "      # ADDED\n",
        "      | 'Keep only spam' >> beam.Filter(lambda line: line[0] == \"spam\")\n",
        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput3\", file_name_suffix = \".txt\")\n",
        "      | 'Print the text file name' >> beam.Map(print)\n",
        ")\n",
        "\n",
        "pipeline.run()\n",
        "\n",
        "! head ansoutput3*.txt"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "v4EcJw45zy6Y"
      },
      "source": [
        "## 2.3 FlatMap\n",
        "\n",
        "Now, that we know we only have SMS labelled spam, we now need to change the element such that instead of each element being a list containing the label and the SMS, each element is a word in the SMS.\n",
        "\n",
        "We can't use `Map`, since `Map` allows us to transform each individual element, but we can't change the number of elements with it.\n",
        "\n",
        "Instead, we want to map a function to each element of a collection. That function returns a list of output elements, so we would get a list of lists of elements. Then we want to flatten the list of lists into a single list.\n",
        "\n",
        "To do this, we will use `FlatMap`, which takes a **function** that transforms a single input `a` into an **iterable of outputs** `b`. But we get a **single collection** containing the outputs of all the elements. In this case, all these elements will be the words found in the SMS."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "3V3w2cz11S_8"
      },
      "source": [
        "Add a `FlatMap` transform that takes in the function `lambda line: re.findall(r\"[a-zA-Z']+\", line[1])` to your code below. The lambda function finds words by finding all elements in the SMS that match the specifications of the regex."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "USHHIPO91xDn"
      },
      "outputs": [],
      "source": [
        "#@title Edit This Code\n",
        "import re\n",
        "\n",
        "inputs_pattern = 'data/SMSSpamCollection'\n",
        "\n",
        "pipeline = beam.Pipeline()\n",
        "\n",
        "outputs = (\n",
        "    pipeline\n",
        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
        "      | 'Separate to list' >> beam.Map(lambda line: line.split(\"\\t\"))\n",
        "      | 'Keep only spam' >> beam.Filter(lambda line: line[0] == \"spam\")\n",
        "      ...\n",
        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput3\", file_name_suffix = \".txt\")\n",
        "      | 'Print the text file name' >> beam.Map(print)\n",
        ")\n",
        "\n",
        "pipeline.run()\n",
        "\n",
        "! head ansoutput3*.txt"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "VFuRMIsN0Edn"
      },
      "outputs": [],
      "source": [
        "#@title Answer\n",
        "import re\n",
        "\n",
        "inputs_pattern = 'data/SMSSpamCollection'\n",
        "\n",
        "pipeline = beam.Pipeline()\n",
        "\n",
        "outputs = (\n",
        "    pipeline\n",
        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
        "      | 'Separate to list' >> beam.Map(lambda line: line.split(\"\\t\"))\n",
        "      | 'Keep only spam' >> beam.Filter(lambda line: line[0] == \"spam\")\n",
        "      # ADDED\n",
        "      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\", line[1]))\n",
        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput3\", file_name_suffix = \".txt\")\n",
        "      | 'Print the text file name' >> beam.Map(print)\n",
        ")\n",
        "\n",
        "pipeline.run()\n",
        "\n",
        "! head ansoutput3*.txt"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "BFJMyIyJ17lE"
      },
      "source": [
        "## 2.4 Combine\n",
        "\n",
        "Now that each word is one element, we have to count up the elements. To do that we can use [aggregation](https://beam.apache.org/documentation/transforms/python/overview/) transforms, specifically `CombinePerKey` in this instance which transforms an iterable of inputs a, and returns a single output a based on their key.\n",
        "\n",
        "Before using `CombinePerKey` however, we have to associate each word with a numerical value to then combine them.\n",
        "\n",
        "To do this, we add `| 'Pair words with 1' >> beam.Map(lambda word: (word, 1))` to the `Pipeline`, which associates each word with the numerical value 1.\n",
        "\n",
        "With each word assigned to a numerical value, we can now combine these numerical values to sum up all the counts of each word. Like the past transforms, `CombinePerKey` takes in a function and applies it to each element of the `PCollection`.\n",
        "\n",
        "However, instead of writing our own lambda function, we can use pass one of Beam's built-in function `sum` into `CombinePerKey`."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Yw_3dfnzI2xA"
      },
      "outputs": [],
      "source": [
        "#@title Edit This Code\n",
        "import re\n",
        "\n",
        "inputs_pattern = 'data/SMSSpamCollection'\n",
        "\n",
        "pipeline = beam.Pipeline()\n",
        "\n",
        "outputs = (\n",
        "    pipeline\n",
        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
        "      | 'Separate to list' >> beam.Map(lambda line: line.split(\"\\t\"))\n",
        "      | 'Keep only spam' >> beam.Filter(lambda line: line[0] == \"spam\")\n",
        "      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\", line[1]))\n",
        "      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
        "      ...\n",
        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput4\", file_name_suffix = \".txt\")\n",
        "      | 'Print the text file name' >> beam.Map(print)\n",
        ")\n",
        "\n",
        "pipeline.run()\n",
        "\n",
        "! head ansoutput4*.txt"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "qDyh9_faIkCo"
      },
      "outputs": [],
      "source": [
        "#@title Answer\n",
        "import re\n",
        "\n",
        "inputs_pattern = 'data/SMSSpamCollection'\n",
        "\n",
        "pipeline = beam.Pipeline()\n",
        "\n",
        "outputs = (\n",
        "    pipeline\n",
        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
        "      | 'Separate to list' >> beam.Map(lambda line: line.split(\"\\t\"))\n",
        "      | 'Keep only spam' >> beam.Filter(lambda line: line[0] == \"spam\")\n",
        "      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\", line[1]))\n",
        "      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
        "      # ADDED\n",
        "      | 'Group and sum' >> beam.CombinePerKey(sum)\n",
        "      | 'Write results' >> beam.io.WriteToText(\"ansoutput4\", file_name_suffix = \".txt\")\n",
        "      | 'Print the text file name' >> beam.Map(print)\n",
        ")\n",
        "\n",
        "pipeline.run()\n",
        "\n",
        "! head ansoutput4*.txt"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {
        "id": "L9zNI4LMJHur"
      },
      "source": [
        "And we finished! Now that we have a count of all the words to gain better understanding about our dataset."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "BozCqbzUPItB"
      },
      "source": [
        "\n",
        "\n",
        "---\n",
        "\n",
        "\n",
        "\n",
        "---\n",
        "\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Cv9508k4MeS4"
      },
      "source": [
        "# Full Spam Ham Apache Beam Example\n",
        "\n",
        "Below is a summary of all the code we performed for your convenience. Note you do not need to explicitly call run with the with statement."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "8iWWy3Y7Msxm"
      },
      "outputs": [],
      "source": [
        "!pip install --quiet apache-beam\n",
        "!mkdir -p data\n",
        "!gsutil cp gs://apachebeamdt/SMSSpamCollection data/"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "43oijI4BCAW1"
      },
      "outputs": [],
      "source": [
        "import apache_beam as beam\n",
        "import re\n",
        "\n",
        "inputs_pattern = 'data/SMSSpamCollection'\n",
        "outputs_prefix_ham = 'outputs/fullcodeham'\n",
        "outputs_prefix_spam = 'outputs/fullcodespam'\n",
        "\n",
        "# Ham Word Count\n",
        "with beam.Pipeline() as pipeline:\n",
        "     ham = (\n",
        "      pipeline\n",
        "      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
        "      | 'Separate to list' >> beam.Map(lambda line: line.split(\"\\t\"))\n",
        "      | 'Keep only ham' >> beam.Filter(lambda line: line[0] == \"ham\")\n",
        "      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\", line[1]))\n",
        "      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
        "      | 'Group and sum' >> beam.CombinePerKey(sum)\n",
        "      | 'Format results' >> beam.Map(lambda word_c: str(word_c))\n",
        "      | 'Write results' >> beam.io.WriteToText(outputs_prefix_ham, file_name_suffix = \".txt\")\n",
        "    )\n",
        "\n",
        "# Spam Word Count\n",
        "with beam.Pipeline() as pipeline1:\n",
        "  spam = (\n",
        "    pipeline1\n",
        "    | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)\n",
        "    | 'Separate to list' >> beam.Map(lambda line: line.split(\"\\t\"))\n",
        "    | 'Filter out only spam' >> beam.Filter(lambda line: line[0] == \"spam\")\n",
        "    | 'Find words' >> beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\", line[1]))\n",
        "    | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n",
        "    | 'Group and sum' >> beam.CombinePerKey(sum)\n",
        "    | 'Format results' >> beam.Map(lambda word_c: str(word_c))\n",
        "    | 'Write results' >> beam.io.WriteToText(outputs_prefix_spam, file_name_suffix = \".txt\")\n",
        "    )\n",
        "\n",
        "print('Ham Word Count Head')\n",
        "! head outputs/fullcodeham*.txt\n",
        "\n",
        "print('Spam Word Count Head')\n",
        "! head outputs/fullcodespam*.txt"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "FNg53x4fyXOO"
      },
      "source": [
        "**One more thing: you can also visualize the Beam pipelines!**\n",
        "\n",
        "Check [this example](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/examples/Interactive%20Beam%20Example.ipynb) to learn more about the interactive Beam."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "k5cYnOElwrm7"
      },
      "outputs": [],
      "source": [
        "import apache_beam.runners.interactive.interactive_beam as ib\n",
        "ib.show_graph(pipeline)"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "provenance": [],
      "toc_visible": true
    },
    "kernelspec": {
      "display_name": "venv",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.9.13"
    },
    "vscode": {
      "interpreter": {
        "hash": "aab5fceeb08468f7e142944162550e82df74df803ff2eb1987d9526d4285522f"
      }
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
